# orm/strategies.py
# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php

"""sqlalchemy.orm.interfaces.LoaderStrategy
   implementations, and related MapperOptions."""
from __future__ import absolute_import

import collections
import itertools

from . import attributes
from . import exc as orm_exc
from . import interfaces
from . import loading
from . import path_registry
from . import properties
from . import query
from . import relationships
from . import unitofwork
from . import util as orm_util
from .base import _DEFER_FOR_STATE
from .base import _RAISE_FOR_STATE
from .base import _SET_DEFERRED_EXPIRED
from .context import _column_descriptions
from .context import ORMCompileState
from .context import ORMSelectCompileState
from .context import QueryContext
from .interfaces import LoaderStrategy
from .interfaces import StrategizedProperty
from .session import _state_session
from .state import InstanceState
from .util import _none_set
from .util import aliased
from .. import event
from .. import exc as sa_exc
from .. import inspect
from .. import log
from .. import sql
from .. import util
from ..sql import util as sql_util
from ..sql import visitors
from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
from ..sql.selectable import Select


def _register_attribute(
    prop,
    mapper,
    useobject,
    compare_function=None,
    typecallable=None,
    callable_=None,
    proxy_property=None,
    active_history=False,
    impl_class=None,
    **kw
):

    listen_hooks = []

    uselist = useobject and prop.uselist

    if useobject and prop.single_parent:
        listen_hooks.append(single_parent_validator)

    if prop.key in prop.parent.validators:
        fn, opts = prop.parent.validators[prop.key]
        listen_hooks.append(
            lambda desc, prop: orm_util._validator_events(
                desc, prop.key, fn, **opts
            )
        )

    if useobject:
        listen_hooks.append(unitofwork.track_cascade_events)

    # need to assemble backref listeners
    # after the singleparentvalidator, mapper validator
    if useobject:
        backref = prop.back_populates
        if backref and prop._effective_sync_backref:
            listen_hooks.append(
                lambda desc, prop: attributes.backref_listeners(
                    desc, backref, uselist
                )
            )

    # a single MapperProperty is shared down a class inheritance
    # hierarchy, so we set up attribute instrumentation and backref event
    # for each mapper down the hierarchy.

    # typically, "mapper" is the same as prop.parent, due to the way
    # the configure_mappers() process runs, however this is not strongly
    # enforced, and in the case of a second configure_mappers() run the
    # mapper here might not be prop.parent; also, a subclass mapper may
    # be called here before a superclass mapper.  That is, can't depend
    # on mappers not already being set up so we have to check each one.

    for m in mapper.self_and_descendants:
        if prop is m._props.get(
            prop.key
        ) and not m.class_manager._attr_has_impl(prop.key):

            desc = attributes.register_attribute_impl(
                m.class_,
                prop.key,
                parent_token=prop,
                uselist=uselist,
                compare_function=compare_function,
                useobject=useobject,
                trackparent=useobject
                and (
                    prop.single_parent
                    or prop.direction is interfaces.ONETOMANY
                ),
                typecallable=typecallable,
                callable_=callable_,
                active_history=active_history,
                impl_class=impl_class,
                send_modified_events=not useobject or not prop.viewonly,
                doc=prop.doc,
                **kw
            )

            for hook in listen_hooks:
                hook(desc, prop)


@properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
class UninstrumentedColumnLoader(LoaderStrategy):
    """Represent a non-instrumented MapperProperty.

    The polymorphic_on argument of mapper() often results in this,
    if the argument is against the with_polymorphic selectable.

    """

    __slots__ = ("columns",)

    def __init__(self, parent, strategy_key):
        super(UninstrumentedColumnLoader, self).__init__(parent, strategy_key)
        self.columns = self.parent_property.columns

    def setup_query(
        self,
        compile_state,
        query_entity,
        path,
        loadopt,
        adapter,
        column_collection=None,
        **kwargs
    ):
        for c in self.columns:
            if adapter:
                c = adapter.columns[c]
            compile_state._append_dedupe_col_collection(c, column_collection)

    def create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):
        pass


@log.class_logger
@properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
class ColumnLoader(LoaderStrategy):
    """Provide loading behavior for a :class:`.ColumnProperty`."""

    __slots__ = "columns", "is_composite"

    def __init__(self, parent, strategy_key):
        super(ColumnLoader, self).__init__(parent, strategy_key)
        self.columns = self.parent_property.columns
        self.is_composite = hasattr(self.parent_property, "composite_class")

    def setup_query(
        self,
        compile_state,
        query_entity,
        path,
        loadopt,
        adapter,
        column_collection,
        memoized_populators,
        check_for_adapt=False,
        **kwargs
    ):
        for c in self.columns:
            if adapter:
                if check_for_adapt:
                    c = adapter.adapt_check_present(c)
                    if c is None:
                        return
                else:
                    c = adapter.columns[c]

            compile_state._append_dedupe_col_collection(c, column_collection)

        fetch = self.columns[0]
        if adapter:
            fetch = adapter.columns[fetch]
        memoized_populators[self.parent_property] = fetch

    def init_class_attribute(self, mapper):
        self.is_class_level = True
        coltype = self.columns[0].type
        # TODO: check all columns ?  check for foreign key as well?
        active_history = (
            self.parent_property.active_history
            or self.columns[0].primary_key
            or (
                mapper.version_id_col is not None
                and mapper._columntoproperty.get(mapper.version_id_col, None)
                is self.parent_property
            )
        )

        _register_attribute(
            self.parent_property,
            mapper,
            useobject=False,
            compare_function=coltype.compare_values,
            active_history=active_history,
        )

    def create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):
        # look through list of columns represented here
        # to see which, if any, is present in the row.
        for col in self.columns:
            if adapter:
                col = adapter.columns[col]
            getter = result._getter(col, False)
            if getter:
                populators["quick"].append((self.key, getter))
                break
        else:
            populators["expire"].append((self.key, True))


@log.class_logger
@properties.ColumnProperty.strategy_for(query_expression=True)
class ExpressionColumnLoader(ColumnLoader):
    def __init__(self, parent, strategy_key):
        super(ExpressionColumnLoader, self).__init__(parent, strategy_key)

        # compare to the "default" expression that is mapped in
        # the column.   If it's sql.null, we don't need to render
        # unless an expr is passed in the options.
        null = sql.null().label(None)
        self._have_default_expression = any(
            not c.compare(null) for c in self.parent_property.columns
        )

    def setup_query(
        self,
        compile_state,
        query_entity,
        path,
        loadopt,
        adapter,
        column_collection,
        memoized_populators,
        **kwargs
    ):
        columns = None
        if loadopt and "expression" in loadopt.local_opts:
            columns = [loadopt.local_opts["expression"]]
        elif self._have_default_expression:
            columns = self.parent_property.columns

        if columns is None:
            return

        for c in columns:
            if adapter:
                c = adapter.columns[c]
            compile_state._append_dedupe_col_collection(c, column_collection)

        fetch = columns[0]
        if adapter:
            fetch = adapter.columns[fetch]
        memoized_populators[self.parent_property] = fetch

    def create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):
        # look through list of columns represented here
        # to see which, if any, is present in the row.
        if loadopt and "expression" in loadopt.local_opts:
            columns = [loadopt.local_opts["expression"]]

            for col in columns:
                if adapter:
                    col = adapter.columns[col]
                getter = result._getter(col, False)
                if getter:
                    populators["quick"].append((self.key, getter))
                    break
            else:
                populators["expire"].append((self.key, True))

    def init_class_attribute(self, mapper):
        self.is_class_level = True

        _register_attribute(
            self.parent_property,
            mapper,
            useobject=False,
            compare_function=self.columns[0].type.compare_values,
            accepts_scalar_loader=False,
        )


@log.class_logger
@properties.ColumnProperty.strategy_for(deferred=True, instrument=True)
@properties.ColumnProperty.strategy_for(
    deferred=True, instrument=True, raiseload=True
)
@properties.ColumnProperty.strategy_for(do_nothing=True)
class DeferredColumnLoader(LoaderStrategy):
    """Provide loading behavior for a deferred :class:`.ColumnProperty`."""

    __slots__ = "columns", "group", "raiseload"

    def __init__(self, parent, strategy_key):
        super(DeferredColumnLoader, self).__init__(parent, strategy_key)
        if hasattr(self.parent_property, "composite_class"):
            raise NotImplementedError(
                "Deferred loading for composite " "types not implemented yet"
            )
        self.raiseload = self.strategy_opts.get("raiseload", False)
        self.columns = self.parent_property.columns
        self.group = self.parent_property.group

    def create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):

        # for a DeferredColumnLoader, this method is only used during a
        # "row processor only" query; see test_deferred.py ->
        # tests with "rowproc_only" in their name.  As of the 1.0 series,
        # loading._instance_processor doesn't use a "row processing" function
        # to populate columns, instead it uses data in the "populators"
        # dictionary.  Normally, the DeferredColumnLoader.setup_query()
        # sets up that data in the "memoized_populators" dictionary
        # and "create_row_processor()" here is never invoked.

        if (
            context.refresh_state
            and context.query._compile_options._only_load_props
            and self.key in context.query._compile_options._only_load_props
        ):
            self.parent_property._get_strategy(
                (("deferred", False), ("instrument", True))
            ).create_row_processor(
                context,
                query_entity,
                path,
                loadopt,
                mapper,
                result,
                adapter,
                populators,
            )

        elif not self.is_class_level:
            if self.raiseload:
                set_deferred_for_local_state = (
                    self.parent_property._raise_column_loader
                )
            else:
                set_deferred_for_local_state = (
                    self.parent_property._deferred_column_loader
                )
            populators["new"].append((self.key, set_deferred_for_local_state))
        else:
            populators["expire"].append((self.key, False))

    def init_class_attribute(self, mapper):
        self.is_class_level = True

        _register_attribute(
            self.parent_property,
            mapper,
            useobject=False,
            compare_function=self.columns[0].type.compare_values,
            callable_=self._load_for_state,
            load_on_unexpire=False,
        )

    def setup_query(
        self,
        compile_state,
        query_entity,
        path,
        loadopt,
        adapter,
        column_collection,
        memoized_populators,
        only_load_props=None,
        **kw
    ):

        if (
            (
                compile_state.compile_options._render_for_subquery
                and self.parent_property._renders_in_subqueries
            )
            or (
                loadopt
                and "undefer_pks" in loadopt.local_opts
                and set(self.columns).intersection(
                    self.parent._should_undefer_in_wildcard
                )
            )
            or (
                loadopt
                and self.group
                and loadopt.local_opts.get(
                    "undefer_group_%s" % self.group, False
                )
            )
            or (only_load_props and self.key in only_load_props)
        ):
            self.parent_property._get_strategy(
                (("deferred", False), ("instrument", True))
            ).setup_query(
                compile_state,
                query_entity,
                path,
                loadopt,
                adapter,
                column_collection,
                memoized_populators,
                **kw
            )
        elif self.is_class_level:
            memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
        elif not self.raiseload:
            memoized_populators[self.parent_property] = _DEFER_FOR_STATE
        else:
            memoized_populators[self.parent_property] = _RAISE_FOR_STATE

    def _load_for_state(self, state, passive):
        if not state.key:
            return attributes.ATTR_EMPTY

        if not passive & attributes.SQL_OK:
            return attributes.PASSIVE_NO_RESULT

        localparent = state.manager.mapper

        if self.group:
            toload = [
                p.key
                for p in localparent.iterate_properties
                if isinstance(p, StrategizedProperty)
                and isinstance(p.strategy, DeferredColumnLoader)
                and p.group == self.group
            ]
        else:
            toload = [self.key]

        # narrow the keys down to just those which have no history
        group = [k for k in toload if k in state.unmodified]

        session = _state_session(state)
        if session is None:
            raise orm_exc.DetachedInstanceError(
                "Parent instance %s is not bound to a Session; "
                "deferred load operation of attribute '%s' cannot proceed"
                % (orm_util.state_str(state), self.key)
            )

        if self.raiseload:
            self._invoke_raise_load(state, passive, "raise")

        if (
            loading.load_on_ident(
                session,
                sql.select(localparent).set_label_style(
                    LABEL_STYLE_TABLENAME_PLUS_COL
                ),
                state.key,
                only_load_props=group,
                refresh_state=state,
            )
            is None
        ):
            raise orm_exc.ObjectDeletedError(state)

        return attributes.ATTR_WAS_SET

    def _invoke_raise_load(self, state, passive, lazy):
        raise sa_exc.InvalidRequestError(
            "'%s' is not available due to raiseload=True" % (self,)
        )


class LoadDeferredColumns(object):
    """serializable loader object used by DeferredColumnLoader"""

    def __init__(self, key, raiseload=False):
        self.key = key
        self.raiseload = raiseload

    def __call__(self, state, passive=attributes.PASSIVE_OFF):
        key = self.key

        localparent = state.manager.mapper
        prop = localparent._props[key]
        if self.raiseload:
            strategy_key = (
                ("deferred", True),
                ("instrument", True),
                ("raiseload", True),
            )
        else:
            strategy_key = (("deferred", True), ("instrument", True))
        strategy = prop._get_strategy(strategy_key)
        return strategy._load_for_state(state, passive)


class AbstractRelationshipLoader(LoaderStrategy):
    """LoaderStratgies which deal with related objects."""

    __slots__ = "mapper", "target", "uselist", "entity"

    def __init__(self, parent, strategy_key):
        super(AbstractRelationshipLoader, self).__init__(parent, strategy_key)
        self.mapper = self.parent_property.mapper
        self.entity = self.parent_property.entity
        self.target = self.parent_property.target
        self.uselist = self.parent_property.uselist


@log.class_logger
@relationships.RelationshipProperty.strategy_for(do_nothing=True)
class DoNothingLoader(LoaderStrategy):
    """Relationship loader that makes no change to the object's state.

    Compared to NoLoader, this loader does not initialize the
    collection/attribute to empty/none; the usual default LazyLoader will
    take effect.

    """


@log.class_logger
@relationships.RelationshipProperty.strategy_for(lazy="noload")
@relationships.RelationshipProperty.strategy_for(lazy=None)
class NoLoader(AbstractRelationshipLoader):
    """Provide loading behavior for a :class:`.RelationshipProperty`
    with "lazy=None".

    """

    __slots__ = ()

    def init_class_attribute(self, mapper):
        self.is_class_level = True

        _register_attribute(
            self.parent_property,
            mapper,
            useobject=True,
            typecallable=self.parent_property.collection_class,
        )

    def create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):
        def invoke_no_load(state, dict_, row):
            if self.uselist:
                attributes.init_state_collection(state, dict_, self.key)
            else:
                dict_[self.key] = None

        populators["new"].append((self.key, invoke_no_load))


@log.class_logger
@relationships.RelationshipProperty.strategy_for(lazy=True)
@relationships.RelationshipProperty.strategy_for(lazy="select")
@relationships.RelationshipProperty.strategy_for(lazy="raise")
@relationships.RelationshipProperty.strategy_for(lazy="raise_on_sql")
@relationships.RelationshipProperty.strategy_for(lazy="baked_select")
class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots):
    """Provide loading behavior for a :class:`.RelationshipProperty`
    with "lazy=True", that is loads when first accessed.

    """

    __slots__ = (
        "_lazywhere",
        "_rev_lazywhere",
        "_lazyload_reverse_option",
        "_order_by",
        "use_get",
        "is_aliased_class",
        "_bind_to_col",
        "_equated_columns",
        "_rev_bind_to_col",
        "_rev_equated_columns",
        "_simple_lazy_clause",
        "_raise_always",
        "_raise_on_sql",
    )

    def __init__(self, parent, strategy_key):
        super(LazyLoader, self).__init__(parent, strategy_key)
        self._raise_always = self.strategy_opts["lazy"] == "raise"
        self._raise_on_sql = self.strategy_opts["lazy"] == "raise_on_sql"

        self.is_aliased_class = inspect(self.entity).is_aliased_class

        join_condition = self.parent_property._join_condition
        (
            self._lazywhere,
            self._bind_to_col,
            self._equated_columns,
        ) = join_condition.create_lazy_clause()

        (
            self._rev_lazywhere,
            self._rev_bind_to_col,
            self._rev_equated_columns,
        ) = join_condition.create_lazy_clause(reverse_direction=True)

        if self.parent_property.order_by:
            self._order_by = [
                sql_util._deep_annotate(elem, {"_orm_adapt": True})
                for elem in util.to_list(self.parent_property.order_by)
            ]
        else:
            self._order_by = None

        self.logger.info("%s lazy loading clause %s", self, self._lazywhere)

        # determine if our "lazywhere" clause is the same as the mapper's
        # get() clause.  then we can just use mapper.get()
        #
        # TODO: the "not self.uselist" can be taken out entirely; a m2o
        # load that populates for a list (very unusual, but is possible with
        # the API) can still set for "None" and the attribute system will
        # populate as an empty list.
        self.use_get = (
            not self.is_aliased_class
            and not self.uselist
            and self.entity._get_clause[0].compare(
                self._lazywhere,
                use_proxies=True,
                compare_keys=False,
                equivalents=self.mapper._equivalent_columns,
            )
        )

        if self.use_get:
            for col in list(self._equated_columns):
                if col in self.mapper._equivalent_columns:
                    for c in self.mapper._equivalent_columns[col]:
                        self._equated_columns[c] = self._equated_columns[col]

            self.logger.info(
                "%s will use Session.get() to " "optimize instance loads", self
            )

    def init_class_attribute(self, mapper):
        self.is_class_level = True

        _legacy_inactive_history_style = (
            self.parent_property._legacy_inactive_history_style
        )

        if self.parent_property.active_history:
            active_history = True
            _deferred_history = False

        elif (
            self.parent_property.direction is not interfaces.MANYTOONE
            or not self.use_get
        ):
            if _legacy_inactive_history_style:
                active_history = True
                _deferred_history = False
            else:
                active_history = False
                _deferred_history = True
        else:
            active_history = _deferred_history = False

        _register_attribute(
            self.parent_property,
            mapper,
            useobject=True,
            callable_=self._load_for_state,
            typecallable=self.parent_property.collection_class,
            active_history=active_history,
            _deferred_history=_deferred_history,
        )

    def _memoized_attr__simple_lazy_clause(self):

        lazywhere = sql_util._deep_annotate(
            self._lazywhere, {"_orm_adapt": True}
        )

        criterion, bind_to_col = (lazywhere, self._bind_to_col)

        params = []

        def visit_bindparam(bindparam):
            bindparam.unique = False

        visitors.traverse(criterion, {}, {"bindparam": visit_bindparam})

        def visit_bindparam(bindparam):
            if bindparam._identifying_key in bind_to_col:
                params.append(
                    (
                        bindparam.key,
                        bind_to_col[bindparam._identifying_key],
                        None,
                    )
                )
            elif bindparam.callable is None:
                params.append((bindparam.key, None, bindparam.value))

        criterion = visitors.cloned_traverse(
            criterion, {}, {"bindparam": visit_bindparam}
        )

        return criterion, params

    def _generate_lazy_clause(self, state, passive):
        criterion, param_keys = self._simple_lazy_clause

        if state is None:
            return sql_util.adapt_criterion_to_null(
                criterion, [key for key, ident, value in param_keys]
            )

        mapper = self.parent_property.parent

        o = state.obj()  # strong ref
        dict_ = attributes.instance_dict(o)

        if passive & attributes.INIT_OK:
            passive ^= attributes.INIT_OK

        params = {}
        for key, ident, value in param_keys:
            if ident is not None:
                if passive and passive & attributes.LOAD_AGAINST_COMMITTED:
                    value = mapper._get_committed_state_attr_by_column(
                        state, dict_, ident, passive
                    )
                else:
                    value = mapper._get_state_attr_by_column(
                        state, dict_, ident, passive
                    )

            params[key] = value

        return criterion, params

    def _invoke_raise_load(self, state, passive, lazy):
        raise sa_exc.InvalidRequestError(
            "'%s' is not available due to lazy='%s'" % (self, lazy)
        )

    def _load_for_state(self, state, passive, loadopt=None, extra_criteria=()):
        if not state.key and (
            (
                not self.parent_property.load_on_pending
                and not state._load_pending
            )
            or not state.session_id
        ):
            return attributes.ATTR_EMPTY

        pending = not state.key
        primary_key_identity = None

        use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)

        if (not passive & attributes.SQL_OK and not use_get) or (
            not passive & attributes.NON_PERSISTENT_OK and pending
        ):
            return attributes.PASSIVE_NO_RESULT

        if (
            # we were given lazy="raise"
            self._raise_always
            # the no_raise history-related flag was not passed
            and not passive & attributes.NO_RAISE
            and (
                # if we are use_get and related_object_ok is disabled,
                # which means we are at most looking in the identity map
                # for history purposes or otherwise returning
                # PASSIVE_NO_RESULT, don't raise.  This is also a
                # history-related flag
                not use_get
                or passive & attributes.RELATED_OBJECT_OK
            )
        ):

            self._invoke_raise_load(state, passive, "raise")

        session = _state_session(state)
        if not session:
            if passive & attributes.NO_RAISE:
                return attributes.PASSIVE_NO_RESULT

            raise orm_exc.DetachedInstanceError(
                "Parent instance %s is not bound to a Session; "
                "lazy load operation of attribute '%s' cannot proceed"
                % (orm_util.state_str(state), self.key)
            )

        # if we have a simple primary key load, check the
        # identity map without generating a Query at all
        if use_get:
            primary_key_identity = self._get_ident_for_use_get(
                session, state, passive
            )
            if attributes.PASSIVE_NO_RESULT in primary_key_identity:
                return attributes.PASSIVE_NO_RESULT
            elif attributes.NEVER_SET in primary_key_identity:
                return attributes.NEVER_SET

            if _none_set.issuperset(primary_key_identity):
                return None

            if (
                self.key in state.dict
                and not passive & attributes.DEFERRED_HISTORY_LOAD
            ):
                return attributes.ATTR_WAS_SET

            # look for this identity in the identity map.  Delegate to the
            # Query class in use, as it may have special rules for how it
            # does this, including how it decides what the correct
            # identity_token would be for this identity.

            instance = session._identity_lookup(
                self.entity,
                primary_key_identity,
                passive=passive,
                lazy_loaded_from=state,
            )

            if instance is not None:
                if instance is attributes.PASSIVE_CLASS_MISMATCH:
                    return None
                else:
                    return instance
            elif (
                not passive & attributes.SQL_OK
                or not passive & attributes.RELATED_OBJECT_OK
            ):
                return attributes.PASSIVE_NO_RESULT

        return self._emit_lazyload(
            session,
            state,
            primary_key_identity,
            passive,
            loadopt,
            extra_criteria,
        )

    def _get_ident_for_use_get(self, session, state, passive):
        instance_mapper = state.manager.mapper

        if passive & attributes.LOAD_AGAINST_COMMITTED:
            get_attr = instance_mapper._get_committed_state_attr_by_column
        else:
            get_attr = instance_mapper._get_state_attr_by_column

        dict_ = state.dict

        return [
            get_attr(state, dict_, self._equated_columns[pk], passive=passive)
            for pk in self.mapper.primary_key
        ]

    @util.preload_module("sqlalchemy.orm.strategy_options")
    def _emit_lazyload(
        self,
        session,
        state,
        primary_key_identity,
        passive,
        loadopt,
        extra_criteria,
    ):
        strategy_options = util.preloaded.orm_strategy_options

        clauseelement = self.entity.__clause_element__()
        stmt = Select._create_raw_select(
            _raw_columns=[clauseelement],
            _propagate_attrs=clauseelement._propagate_attrs,
            _label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
            _compile_options=ORMCompileState.default_compile_options,
        )
        load_options = QueryContext.default_load_options

        load_options += {
            "_invoke_all_eagers": False,
            "_lazy_loaded_from": state,
        }

        if self.parent_property.secondary is not None:
            stmt = stmt.select_from(
                self.mapper, self.parent_property.secondary
            )

        pending = not state.key

        # don't autoflush on pending
        if pending or passive & attributes.NO_AUTOFLUSH:
            stmt._execution_options = util.immutabledict({"autoflush": False})

        use_get = self.use_get

        if state.load_options or (loadopt and loadopt._extra_criteria):
            effective_path = state.load_path[self.parent_property]

            opts = tuple(state.load_options)

            if loadopt and loadopt._extra_criteria:
                use_get = False
                opts += (
                    orm_util.LoaderCriteriaOption(self.entity, extra_criteria),
                )

            stmt._with_options = opts
        else:
            # this path is used if there are not already any options
            # in the query, but an event may want to add them
            effective_path = state.mapper._path_registry[self.parent_property]

        stmt._compile_options += {"_current_path": effective_path}

        if use_get:
            if self._raise_on_sql and not passive & attributes.NO_RAISE:
                self._invoke_raise_load(state, passive, "raise_on_sql")

            return loading.load_on_pk_identity(
                session, stmt, primary_key_identity, load_options=load_options
            )

        if self._order_by:
            stmt._order_by_clauses = self._order_by

        def _lazyload_reverse(compile_context):
            for rev in self.parent_property._reverse_property:
                # reverse props that are MANYTOONE are loading *this*
                # object from get(), so don't need to eager out to those.
                if (
                    rev.direction is interfaces.MANYTOONE
                    and rev._use_get
                    and not isinstance(rev.strategy, LazyLoader)
                ):
                    strategy_options.Load.for_existing_path(
                        compile_context.compile_options._current_path[
                            rev.parent
                        ]
                    ).lazyload(rev).process_compile_state(compile_context)

        stmt._with_context_options += (
            (_lazyload_reverse, self.parent_property),
        )

        lazy_clause, params = self._generate_lazy_clause(state, passive)

        execution_options = {
            "_sa_orm_load_options": load_options,
        }

        if (
            self.key in state.dict
            and not passive & attributes.DEFERRED_HISTORY_LOAD
        ):
            return attributes.ATTR_WAS_SET

        if pending:
            if util.has_intersection(orm_util._none_set, params.values()):
                return None

        elif util.has_intersection(orm_util._never_set, params.values()):
            return None

        if self._raise_on_sql and not passive & attributes.NO_RAISE:
            self._invoke_raise_load(state, passive, "raise_on_sql")

        stmt._where_criteria = (lazy_clause,)

        result = session.execute(
            stmt, params, execution_options=execution_options
        )

        result = result.unique().scalars().all()

        if self.uselist:
            return result
        else:
            l = len(result)
            if l:
                if l > 1:
                    util.warn(
                        "Multiple rows returned with "
                        "uselist=False for lazily-loaded attribute '%s' "
                        % self.parent_property
                    )

                return result[0]
            else:
                return None

    def create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):
        key = self.key

        if not self.is_class_level or (loadopt and loadopt._extra_criteria):
            # we are not the primary manager for this attribute
            # on this class - set up a
            # per-instance lazyloader, which will override the
            # class-level behavior.
            # this currently only happens when using a
            # "lazyload" option on a "no load"
            # attribute - "eager" attributes always have a
            # class-level lazyloader installed.
            set_lazy_callable = (
                InstanceState._instance_level_callable_processor
            )(
                mapper.class_manager,
                LoadLazyAttribute(
                    key,
                    self,
                    loadopt,
                    loadopt._generate_extra_criteria(context)
                    if loadopt._extra_criteria
                    else None,
                ),
                key,
            )

            populators["new"].append((self.key, set_lazy_callable))
        elif context.populate_existing or mapper.always_refresh:

            def reset_for_lazy_callable(state, dict_, row):
                # we are the primary manager for this attribute on
                # this class - reset its
                # per-instance attribute state, so that the class-level
                # lazy loader is
                # executed when next referenced on this instance.
                # this is needed in
                # populate_existing() types of scenarios to reset
                # any existing state.
                state._reset(dict_, key)

            populators["new"].append((self.key, reset_for_lazy_callable))


class LoadLazyAttribute(object):
    """semi-serializable loader object used by LazyLoader

    Historically, this object would be carried along with instances that
    needed to run lazyloaders, so it had to be serializable to support
    cached instances.

    this is no longer a general requirement, and the case where this object
    is used is exactly the case where we can't really serialize easily,
    which is when extra criteria in the loader option is present.

    We can't reliably serialize that as it refers to mapped entities and
    AliasedClass objects that are local to the current process, which would
    need to be matched up on deserialize e.g. the sqlalchemy.ext.serializer
    approach.

    """

    def __init__(self, key, initiating_strategy, loadopt, extra_criteria):
        self.key = key
        self.strategy_key = initiating_strategy.strategy_key
        self.loadopt = loadopt
        self.extra_criteria = extra_criteria

    def __getstate__(self):
        if self.extra_criteria is not None:
            util.warn(
                "Can't reliably serialize a lazyload() option that "
                "contains additional criteria; please use eager loading "
                "for this case"
            )
        return {
            "key": self.key,
            "strategy_key": self.strategy_key,
            "loadopt": self.loadopt,
            "extra_criteria": (),
        }

    def __call__(self, state, passive=attributes.PASSIVE_OFF):
        key = self.key
        instance_mapper = state.manager.mapper
        prop = instance_mapper._props[key]
        strategy = prop._strategies[self.strategy_key]

        return strategy._load_for_state(
            state,
            passive,
            loadopt=self.loadopt,
            extra_criteria=self.extra_criteria,
        )


class PostLoader(AbstractRelationshipLoader):
    """A relationship loader that emits a second SELECT statement."""

    def _check_recursive_postload(self, context, path, join_depth=None):
        effective_path = (
            context.compile_state.current_path or orm_util.PathRegistry.root
        ) + path

        if loading.PostLoad.path_exists(
            context, effective_path, self.parent_property
        ):
            return True

        path_w_prop = path[self.parent_property]
        effective_path_w_prop = effective_path[self.parent_property]

        if not path_w_prop.contains(context.attributes, "loader"):
            if join_depth:
                if effective_path_w_prop.length / 2 > join_depth:
                    return True
            elif effective_path_w_prop.contains_mapper(self.mapper):
                return True

        return False

    def _immediateload_create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):
        return self.parent_property._get_strategy(
            (("lazy", "immediate"),)
        ).create_row_processor(
            context,
            query_entity,
            path,
            loadopt,
            mapper,
            result,
            adapter,
            populators,
        )


@relationships.RelationshipProperty.strategy_for(lazy="immediate")
class ImmediateLoader(PostLoader):
    __slots__ = ()

    def init_class_attribute(self, mapper):
        self.parent_property._get_strategy(
            (("lazy", "select"),)
        ).init_class_attribute(mapper)

    def create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):
        def load_immediate(state, dict_, row):
            state.get_impl(self.key).get(state, dict_, flags)

        if self._check_recursive_postload(context, path):
            # this will not emit SQL and will only emit for a many-to-one
            # "use get" load.   the "_RELATED" part means it may return
            # instance even if its expired, since this is a mutually-recursive
            # load operation.
            flags = attributes.PASSIVE_NO_FETCH_RELATED | attributes.NO_RAISE
        else:
            flags = attributes.PASSIVE_OFF | attributes.NO_RAISE

        populators["delayed"].append((self.key, load_immediate))


@log.class_logger
@relationships.RelationshipProperty.strategy_for(lazy="subquery")
class SubqueryLoader(PostLoader):
    __slots__ = ("join_depth",)

    def __init__(self, parent, strategy_key):
        super(SubqueryLoader, self).__init__(parent, strategy_key)
        self.join_depth = self.parent_property.join_depth

    def init_class_attribute(self, mapper):
        self.parent_property._get_strategy(
            (("lazy", "select"),)
        ).init_class_attribute(mapper)

    def _get_leftmost(
        self,
        orig_query_entity_index,
        subq_path,
        current_compile_state,
        is_root,
    ):
        given_subq_path = subq_path
        subq_path = subq_path.path
        subq_mapper = orm_util._class_to_mapper(subq_path[0])

        # determine attributes of the leftmost mapper
        if (
            self.parent.isa(subq_mapper)
            and self.parent_property is subq_path[1]
        ):
            leftmost_mapper, leftmost_prop = self.parent, self.parent_property
        else:
            leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1]

        if is_root:
            # the subq_path is also coming from cached state, so when we start
            # building up this path, it has to also be converted to be in terms
            # of the current state. this is for the specific case of the entity
            # is an AliasedClass against a subquery that's not otherwise going
            # to adapt
            new_subq_path = current_compile_state._entities[
                orig_query_entity_index
            ].entity_zero._path_registry[leftmost_prop]
            additional = len(subq_path) - len(new_subq_path)
            if additional:
                new_subq_path += path_registry.PathRegistry.coerce(
                    subq_path[-additional:]
                )
        else:
            new_subq_path = given_subq_path

        leftmost_cols = leftmost_prop.local_columns

        leftmost_attr = [
            getattr(
                new_subq_path.path[0].entity,
                leftmost_mapper._columntoproperty[c].key,
            )
            for c in leftmost_cols
        ]

        return leftmost_mapper, leftmost_attr, leftmost_prop, new_subq_path

    def _generate_from_original_query(
        self,
        orig_compile_state,
        orig_query,
        leftmost_mapper,
        leftmost_attr,
        leftmost_relationship,
        orig_entity,
    ):
        # reformat the original query
        # to look only for significant columns
        q = orig_query._clone().correlate(None)

        # LEGACY: make a Query back from the select() !!
        # This suits at least two legacy cases:
        # 1. applications which expect before_compile() to be called
        #    below when we run .subquery() on this query (Keystone)
        # 2. applications which are doing subqueryload with complex
        #    from_self() queries, as query.subquery() / .statement
        #    has to do the full compile context for multiply-nested
        #    from_self() (Neutron) - see test_subqload_from_self
        #    for demo.
        q2 = query.Query.__new__(query.Query)
        q2.__dict__.update(q.__dict__)
        q = q2

        # set the query's "FROM" list explicitly to what the
        # FROM list would be in any case, as we will be limiting
        # the columns in the SELECT list which may no longer include
        # all entities mentioned in things like WHERE, JOIN, etc.
        if not q._from_obj:
            q._enable_assertions = False
            q.select_from.non_generative(
                q,
                *{
                    ent["entity"]
                    for ent in _column_descriptions(
                        orig_query, compile_state=orig_compile_state
                    )
                    if ent["entity"] is not None
                }
            )

        # select from the identity columns of the outer (specifically, these
        # are the 'local_cols' of the property).  This will remove other
        # columns from the query that might suggest the right entity which is
        # why we do set select_from above.   The attributes we have are
        # coerced and adapted using the original query's adapter, which is
        # needed only for the case of adapting a subclass column to
        # that of a polymorphic selectable, e.g. we have
        # Engineer.primary_language and the entity is Person.  All other
        # adaptations, e.g. from_self, select_entity_from(), will occur
        # within the new query when it compiles, as the compile_state we are
        # using here is only a partial one.  If the subqueryload is from a
        # with_polymorphic() or other aliased() object, left_attr will already
        # be the correct attributes so no adaptation is needed.
        target_cols = orig_compile_state._adapt_col_list(
            [
                sql.coercions.expect(sql.roles.ColumnsClauseRole, o)
                for o in leftmost_attr
            ],
            orig_compile_state._get_current_adapter(),
        )
        q._raw_columns = target_cols

        distinct_target_key = leftmost_relationship.distinct_target_key

        if distinct_target_key is True:
            q._distinct = True
        elif distinct_target_key is None:
            # if target_cols refer to a non-primary key or only
            # part of a composite primary key, set the q as distinct
            for t in set(c.table for c in target_cols):
                if not set(target_cols).issuperset(t.primary_key):
                    q._distinct = True
                    break

        # don't need ORDER BY if no limit/offset
        if not q._has_row_limiting_clause:
            q._order_by_clauses = ()

        if q._distinct is True and q._order_by_clauses:
            # the logic to automatically add the order by columns to the query
            # when distinct is True is deprecated in the query
            to_add = sql_util.expand_column_list_from_order_by(
                target_cols, q._order_by_clauses
            )
            if to_add:
                q._set_entities(target_cols + to_add)

        # the original query now becomes a subquery
        # which we'll join onto.
        # LEGACY: as "q" is a Query, the before_compile() event is invoked
        # here.
        embed_q = q.set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL).subquery()
        left_alias = orm_util.AliasedClass(
            leftmost_mapper, embed_q, use_mapper_path=True
        )
        return left_alias

    def _prep_for_joins(self, left_alias, subq_path):
        # figure out what's being joined.  a.k.a. the fun part
        to_join = []
        pairs = list(subq_path.pairs())

        for i, (mapper, prop) in enumerate(pairs):
            if i > 0:
                # look at the previous mapper in the chain -
                # if it is as or more specific than this prop's
                # mapper, use that instead.
                # note we have an assumption here that
                # the non-first element is always going to be a mapper,
                # not an AliasedClass

                prev_mapper = pairs[i - 1][1].mapper
                to_append = prev_mapper if prev_mapper.isa(mapper) else mapper
            else:
                to_append = mapper

            to_join.append((to_append, prop.key))

        # determine the immediate parent class we are joining from,
        # which needs to be aliased.

        if len(to_join) < 2:
            # in the case of a one level eager load, this is the
            # leftmost "left_alias".
            parent_alias = left_alias
        else:
            info = inspect(to_join[-1][0])
            if info.is_aliased_class:
                parent_alias = info.entity
            else:
                # alias a plain mapper as we may be
                # joining multiple times
                parent_alias = orm_util.AliasedClass(
                    info.entity, use_mapper_path=True
                )

        local_cols = self.parent_property.local_columns

        local_attr = [
            getattr(parent_alias, self.parent._columntoproperty[c].key)
            for c in local_cols
        ]
        return to_join, local_attr, parent_alias

    def _apply_joins(
        self, q, to_join, left_alias, parent_alias, effective_entity
    ):

        ltj = len(to_join)
        if ltj == 1:
            to_join = [
                getattr(left_alias, to_join[0][1]).of_type(effective_entity)
            ]
        elif ltj == 2:
            to_join = [
                getattr(left_alias, to_join[0][1]).of_type(parent_alias),
                getattr(parent_alias, to_join[-1][1]).of_type(
                    effective_entity
                ),
            ]
        elif ltj > 2:
            middle = [
                (
                    orm_util.AliasedClass(item[0])
                    if not inspect(item[0]).is_aliased_class
                    else item[0].entity,
                    item[1],
                )
                for item in to_join[1:-1]
            ]
            inner = []

            while middle:
                item = middle.pop(0)
                attr = getattr(item[0], item[1])
                if middle:
                    attr = attr.of_type(middle[0][0])
                else:
                    attr = attr.of_type(parent_alias)

                inner.append(attr)

            to_join = (
                [getattr(left_alias, to_join[0][1]).of_type(inner[0].parent)]
                + inner
                + [
                    getattr(parent_alias, to_join[-1][1]).of_type(
                        effective_entity
                    )
                ]
            )

        for attr in to_join:
            q = q.join(attr)

        return q

    def _setup_options(
        self,
        context,
        q,
        subq_path,
        rewritten_path,
        orig_query,
        effective_entity,
        loadopt,
    ):

        # note that because the subqueryload object
        # does not re-use the cached query, instead always making
        # use of the current invoked query, while we have two queries
        # here (orig and context.query), they are both non-cached
        # queries and we can transfer the options as is without
        # adjusting for new criteria.   Some work on #6881 / #6889
        # brought this into question.
        new_options = orig_query._with_options

        if loadopt and loadopt._extra_criteria:

            new_options += (
                orm_util.LoaderCriteriaOption(
                    self.entity,
                    loadopt._generate_extra_criteria(context),
                ),
            )

        # propagate loader options etc. to the new query.
        # these will fire relative to subq_path.
        q = q._with_current_path(rewritten_path)
        q = q.options(*new_options)

        return q

    def _setup_outermost_orderby(self, q):
        if self.parent_property.order_by:

            def _setup_outermost_orderby(compile_context):
                compile_context.eager_order_by += tuple(
                    util.to_list(self.parent_property.order_by)
                )

            q = q._add_context_option(
                _setup_outermost_orderby, self.parent_property
            )

        return q

    class _SubqCollections(object):
        """Given a :class:`_query.Query` used to emit the "subquery load",
        provide a load interface that executes the query at the
        first moment a value is needed.

        """

        __slots__ = (
            "session",
            "execution_options",
            "load_options",
            "params",
            "subq",
            "_data",
        )

        def __init__(self, context, subq):
            # avoid creating a cycle by storing context
            # even though that's preferable
            self.session = context.session
            self.execution_options = context.execution_options
            self.load_options = context.load_options
            self.params = context.params or {}
            self.subq = subq
            self._data = None

        def get(self, key, default):
            if self._data is None:
                self._load()
            return self._data.get(key, default)

        def _load(self):
            self._data = collections.defaultdict(list)

            q = self.subq
            assert q.session is None

            q = q.with_session(self.session)

            if self.load_options._populate_existing:
                q = q.populate_existing()
            # to work with baked query, the parameters may have been
            # updated since this query was created, so take these into account

            rows = list(q.params(self.params))
            for k, v in itertools.groupby(rows, lambda x: x[1:]):
                self._data[k].extend(vv[0] for vv in v)

        def loader(self, state, dict_, row):
            if self._data is None:
                self._load()

    def _setup_query_from_rowproc(
        self,
        context,
        query_entity,
        path,
        entity,
        loadopt,
        adapter,
    ):
        compile_state = context.compile_state
        if (
            not compile_state.compile_options._enable_eagerloads
            or compile_state.compile_options._for_refresh_state
        ):
            return

        orig_query_entity_index = compile_state._entities.index(query_entity)
        context.loaders_require_buffering = True

        path = path[self.parent_property]

        # build up a path indicating the path from the leftmost
        # entity to the thing we're subquery loading.
        with_poly_entity = path.get(
            compile_state.attributes, "path_with_polymorphic", None
        )
        if with_poly_entity is not None:
            effective_entity = with_poly_entity
        else:
            effective_entity = self.entity

        subq_path, rewritten_path = context.query._execution_options.get(
            ("subquery_paths", None),
            (orm_util.PathRegistry.root, orm_util.PathRegistry.root),
        )
        is_root = subq_path is orm_util.PathRegistry.root
        subq_path = subq_path + path
        rewritten_path = rewritten_path + path

        # if not via query option, check for
        # a cycle
        # TODO: why is this here???  this is now handled
        # by the _check_recursive_postload call
        if not path.contains(compile_state.attributes, "loader"):
            if self.join_depth:
                if (
                    (
                        compile_state.current_path.length
                        if compile_state.current_path
                        else 0
                    )
                    + path.length
                ) / 2 > self.join_depth:
                    return
            elif subq_path.contains_mapper(self.mapper):
                return

        # use the current query being invoked, not the compile state
        # one.  this is so that we get the current parameters.  however,
        # it means we can't use the existing compile state, we have to make
        # a new one.    other approaches include possibly using the
        # compiled query but swapping the params, seems only marginally
        # less time spent but more complicated
        orig_query = context.query._execution_options.get(
            ("orig_query", SubqueryLoader), context.query
        )

        # make a new compile_state for the query that's probably cached, but
        # we're sort of undoing a bit of that caching :(
        compile_state_cls = ORMCompileState._get_plugin_class_for_plugin(
            orig_query, "orm"
        )

        if orig_query._is_lambda_element:
            if context.load_options._lazy_loaded_from is None:
                util.warn(
                    'subqueryloader for "%s" must invoke lambda callable '
                    "at %r in "
                    "order to produce a new query, decreasing the efficiency "
                    "of caching for this statement.  Consider using "
                    "selectinload() for more effective full-lambda caching"
                    % (self, orig_query)
                )
            orig_query = orig_query._resolved

        # this is the more "quick" version, however it's not clear how
        # much of this we need.    in particular I can't get a test to
        # fail if the "set_base_alias" is missing and not sure why that is.
        orig_compile_state = compile_state_cls._create_entities_collection(
            orig_query, legacy=False
        )

        (
            leftmost_mapper,
            leftmost_attr,
            leftmost_relationship,
            rewritten_path,
        ) = self._get_leftmost(
            orig_query_entity_index,
            rewritten_path,
            orig_compile_state,
            is_root,
        )

        # generate a new Query from the original, then
        # produce a subquery from it.
        left_alias = self._generate_from_original_query(
            orig_compile_state,
            orig_query,
            leftmost_mapper,
            leftmost_attr,
            leftmost_relationship,
            entity,
        )

        # generate another Query that will join the
        # left alias to the target relationships.
        # basically doing a longhand
        # "from_self()".  (from_self() itself not quite industrial
        # strength enough for all contingencies...but very close)

        q = query.Query(effective_entity)

        q._execution_options = q._execution_options.union(
            {
                ("orig_query", SubqueryLoader): orig_query,
                ("subquery_paths", None): (subq_path, rewritten_path),
            }
        )

        q = q._set_enable_single_crit(False)
        to_join, local_attr, parent_alias = self._prep_for_joins(
            left_alias, subq_path
        )

        q = q.add_columns(*local_attr)
        q = self._apply_joins(
            q, to_join, left_alias, parent_alias, effective_entity
        )

        q = self._setup_options(
            context,
            q,
            subq_path,
            rewritten_path,
            orig_query,
            effective_entity,
            loadopt,
        )
        q = self._setup_outermost_orderby(q)

        return q

    def create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):

        if context.refresh_state:
            return self._immediateload_create_row_processor(
                context,
                query_entity,
                path,
                loadopt,
                mapper,
                result,
                adapter,
                populators,
            )
        # the subqueryloader does a similar check in setup_query() unlike
        # the other post loaders, however we have this here for consistency
        elif self._check_recursive_postload(context, path, self.join_depth):
            return
        elif not isinstance(context.compile_state, ORMSelectCompileState):
            # issue 7505 - subqueryload() in 1.3 and previous would silently
            # degrade for from_statement() without warning. this behavior
            # is restored here
            return

        if not self.parent.class_manager[self.key].impl.supports_population:
            raise sa_exc.InvalidRequestError(
                "'%s' does not support object "
                "population - eager loading cannot be applied." % self
            )

        # a little dance here as the "path" is still something that only
        # semi-tracks the exact series of things we are loading, still not
        # telling us about with_polymorphic() and stuff like that when it's at
        # the root..  the initial MapperEntity is more accurate for this case.
        if len(path) == 1:
            if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
                return
        elif not orm_util._entity_isa(path[-1], self.parent):
            return

        subq = self._setup_query_from_rowproc(
            context,
            query_entity,
            path,
            path[-1],
            loadopt,
            adapter,
        )

        if subq is None:
            return

        assert subq.session is None

        path = path[self.parent_property]

        local_cols = self.parent_property.local_columns

        # cache the loaded collections in the context
        # so that inheriting mappers don't re-load when they
        # call upon create_row_processor again
        collections = path.get(context.attributes, "collections")
        if collections is None:
            collections = self._SubqCollections(context, subq)
            path.set(context.attributes, "collections", collections)

        if adapter:
            local_cols = [adapter.columns[c] for c in local_cols]

        if self.uselist:
            self._create_collection_loader(
                context, result, collections, local_cols, populators
            )
        else:
            self._create_scalar_loader(
                context, result, collections, local_cols, populators
            )

    def _create_collection_loader(
        self, context, result, collections, local_cols, populators
    ):
        tuple_getter = result._tuple_getter(local_cols)

        def load_collection_from_subq(state, dict_, row):
            collection = collections.get(tuple_getter(row), ())
            state.get_impl(self.key).set_committed_value(
                state, dict_, collection
            )

        def load_collection_from_subq_existing_row(state, dict_, row):
            if self.key not in dict_:
                load_collection_from_subq(state, dict_, row)

        populators["new"].append((self.key, load_collection_from_subq))
        populators["existing"].append(
            (self.key, load_collection_from_subq_existing_row)
        )

        if context.invoke_all_eagers:
            populators["eager"].append((self.key, collections.loader))

    def _create_scalar_loader(
        self, context, result, collections, local_cols, populators
    ):
        tuple_getter = result._tuple_getter(local_cols)

        def load_scalar_from_subq(state, dict_, row):
            collection = collections.get(tuple_getter(row), (None,))
            if len(collection) > 1:
                util.warn(
                    "Multiple rows returned with "
                    "uselist=False for eagerly-loaded attribute '%s' " % self
                )

            scalar = collection[0]
            state.get_impl(self.key).set_committed_value(state, dict_, scalar)

        def load_scalar_from_subq_existing_row(state, dict_, row):
            if self.key not in dict_:
                load_scalar_from_subq(state, dict_, row)

        populators["new"].append((self.key, load_scalar_from_subq))
        populators["existing"].append(
            (self.key, load_scalar_from_subq_existing_row)
        )
        if context.invoke_all_eagers:
            populators["eager"].append((self.key, collections.loader))


@log.class_logger
@relationships.RelationshipProperty.strategy_for(lazy="joined")
@relationships.RelationshipProperty.strategy_for(lazy=False)
class JoinedLoader(AbstractRelationshipLoader):
    """Provide loading behavior for a :class:`.RelationshipProperty`
    using joined eager loading.

    """

    __slots__ = "join_depth", "_aliased_class_pool"

    def __init__(self, parent, strategy_key):
        super(JoinedLoader, self).__init__(parent, strategy_key)
        self.join_depth = self.parent_property.join_depth
        self._aliased_class_pool = []

    def init_class_attribute(self, mapper):
        self.parent_property._get_strategy(
            (("lazy", "select"),)
        ).init_class_attribute(mapper)

    def setup_query(
        self,
        compile_state,
        query_entity,
        path,
        loadopt,
        adapter,
        column_collection=None,
        parentmapper=None,
        chained_from_outerjoin=False,
        **kwargs
    ):
        """Add a left outer join to the statement that's being constructed."""

        if not compile_state.compile_options._enable_eagerloads:
            return
        elif self.uselist:
            compile_state.multi_row_eager_loaders = True

        path = path[self.parent_property]

        with_polymorphic = None

        user_defined_adapter = (
            self._init_user_defined_eager_proc(
                loadopt, compile_state, compile_state.attributes
            )
            if loadopt
            else False
        )

        if user_defined_adapter is not False:
            (
                clauses,
                adapter,
                add_to_collection,
            ) = self._setup_query_on_user_defined_adapter(
                compile_state,
                query_entity,
                path,
                adapter,
                user_defined_adapter,
            )
        else:
            # if not via query option, check for
            # a cycle
            if not path.contains(compile_state.attributes, "loader"):
                if self.join_depth:
                    if path.length / 2 > self.join_depth:
                        return
                elif path.contains_mapper(self.mapper):
                    return

            (
                clauses,
                adapter,
                add_to_collection,
                chained_from_outerjoin,
            ) = self._generate_row_adapter(
                compile_state,
                query_entity,
                path,
                loadopt,
                adapter,
                column_collection,
                parentmapper,
                chained_from_outerjoin,
            )

        with_poly_entity = path.get(
            compile_state.attributes, "path_with_polymorphic", None
        )
        if with_poly_entity is not None:
            with_polymorphic = inspect(
                with_poly_entity
            ).with_polymorphic_mappers
        else:
            with_polymorphic = None

        path = path[self.entity]

        loading._setup_entity_query(
            compile_state,
            self.mapper,
            query_entity,
            path,
            clauses,
            add_to_collection,
            with_polymorphic=with_polymorphic,
            parentmapper=self.mapper,
            chained_from_outerjoin=chained_from_outerjoin,
        )

        if with_poly_entity is not None and None in set(
            compile_state.secondary_columns
        ):
            raise sa_exc.InvalidRequestError(
                "Detected unaliased columns when generating joined "
                "load.  Make sure to use aliased=True or flat=True "
                "when using joined loading with with_polymorphic()."
            )

    def _init_user_defined_eager_proc(
        self, loadopt, compile_state, target_attributes
    ):

        # check if the opt applies at all
        if "eager_from_alias" not in loadopt.local_opts:
            # nope
            return False

        path = loadopt.path.parent

        # the option applies.  check if the "user_defined_eager_row_processor"
        # has been built up.
        adapter = path.get(
            compile_state.attributes, "user_defined_eager_row_processor", False
        )
        if adapter is not False:
            # just return it
            return adapter

        # otherwise figure it out.
        alias = loadopt.local_opts["eager_from_alias"]
        root_mapper, prop = path[-2:]

        if alias is not None:
            if isinstance(alias, str):
                alias = prop.target.alias(alias)
            adapter = sql_util.ColumnAdapter(
                alias, equivalents=prop.mapper._equivalent_columns
            )
        else:
            if path.contains(
                compile_state.attributes, "path_with_polymorphic"
            ):
                with_poly_entity = path.get(
                    compile_state.attributes, "path_with_polymorphic"
                )
                adapter = orm_util.ORMAdapter(
                    with_poly_entity,
                    equivalents=prop.mapper._equivalent_columns,
                )
            else:
                adapter = compile_state._polymorphic_adapters.get(
                    prop.mapper, None
                )
        path.set(
            target_attributes,
            "user_defined_eager_row_processor",
            adapter,
        )

        return adapter

    def _setup_query_on_user_defined_adapter(
        self, context, entity, path, adapter, user_defined_adapter
    ):

        # apply some more wrapping to the "user defined adapter"
        # if we are setting up the query for SQL render.
        adapter = entity._get_entity_clauses(context)

        if adapter and user_defined_adapter:
            user_defined_adapter = user_defined_adapter.wrap(adapter)
            path.set(
                context.attributes,
                "user_defined_eager_row_processor",
                user_defined_adapter,
            )
        elif adapter:
            user_defined_adapter = adapter
            path.set(
                context.attributes,
                "user_defined_eager_row_processor",
                user_defined_adapter,
            )

        add_to_collection = context.primary_columns
        return user_defined_adapter, adapter, add_to_collection

    def _gen_pooled_aliased_class(self, context):
        # keep a local pool of AliasedClass objects that get re-used.
        # we need one unique AliasedClass per query per appearance of our
        # entity in the query.

        if inspect(self.entity).is_aliased_class:
            alt_selectable = inspect(self.entity).selectable
        else:
            alt_selectable = None

        key = ("joinedloader_ac", self)
        if key not in context.attributes:
            context.attributes[key] = idx = 0
        else:
            context.attributes[key] = idx = context.attributes[key] + 1

        if idx >= len(self._aliased_class_pool):
            to_adapt = orm_util.AliasedClass(
                self.mapper,
                alias=alt_selectable._anonymous_fromclause(flat=True)
                if alt_selectable is not None
                else None,
                flat=True,
                use_mapper_path=True,
            )

            # load up the .columns collection on the Alias() before
            # the object becomes shared among threads.  this prevents
            # races for column identities.
            inspect(to_adapt).selectable.c
            self._aliased_class_pool.append(to_adapt)

        return self._aliased_class_pool[idx]

    def _generate_row_adapter(
        self,
        compile_state,
        entity,
        path,
        loadopt,
        adapter,
        column_collection,
        parentmapper,
        chained_from_outerjoin,
    ):
        with_poly_entity = path.get(
            compile_state.attributes, "path_with_polymorphic", None
        )
        if with_poly_entity:
            to_adapt = with_poly_entity
        else:
            to_adapt = self._gen_pooled_aliased_class(compile_state)

        clauses = inspect(to_adapt)._memo(
            ("joinedloader_ormadapter", self),
            orm_util.ORMAdapter,
            to_adapt,
            equivalents=self.mapper._equivalent_columns,
            adapt_required=True,
            allow_label_resolve=False,
            anonymize_labels=True,
        )

        assert clauses.aliased_class is not None

        innerjoin = (
            loadopt.local_opts.get("innerjoin", self.parent_property.innerjoin)
            if loadopt is not None
            else self.parent_property.innerjoin
        )

        if not innerjoin:
            # if this is an outer join, all non-nested eager joins from
            # this path must also be outer joins
            chained_from_outerjoin = True

        compile_state.create_eager_joins.append(
            (
                self._create_eager_join,
                entity,
                path,
                adapter,
                parentmapper,
                clauses,
                innerjoin,
                chained_from_outerjoin,
                loadopt._extra_criteria if loadopt else (),
            )
        )

        add_to_collection = compile_state.secondary_columns
        path.set(compile_state.attributes, "eager_row_processor", clauses)

        return clauses, adapter, add_to_collection, chained_from_outerjoin

    def _create_eager_join(
        self,
        compile_state,
        query_entity,
        path,
        adapter,
        parentmapper,
        clauses,
        innerjoin,
        chained_from_outerjoin,
        extra_criteria,
    ):
        if parentmapper is None:
            localparent = query_entity.mapper
        else:
            localparent = parentmapper

        # whether or not the Query will wrap the selectable in a subquery,
        # and then attach eager load joins to that (i.e., in the case of
        # LIMIT/OFFSET etc.)
        should_nest_selectable = (
            compile_state.multi_row_eager_loaders
            and compile_state._should_nest_selectable
        )

        query_entity_key = None

        if (
            query_entity not in compile_state.eager_joins
            and not should_nest_selectable
            and compile_state.from_clauses
        ):

            indexes = sql_util.find_left_clause_that_matches_given(
                compile_state.from_clauses, query_entity.selectable
            )

            if len(indexes) > 1:
                # for the eager load case, I can't reproduce this right
                # now.   For query.join() I can.
                raise sa_exc.InvalidRequestError(
                    "Can't identify which query entity in which to joined "
                    "eager load from.   Please use an exact match when "
                    "specifying the join path."
                )

            if indexes:
                clause = compile_state.from_clauses[indexes[0]]
                # join to an existing FROM clause on the query.
                # key it to its list index in the eager_joins dict.
                # Query._compile_context will adapt as needed and
                # append to the FROM clause of the select().
                query_entity_key, default_towrap = indexes[0], clause

        if query_entity_key is None:
            query_entity_key, default_towrap = (
                query_entity,
                query_entity.selectable,
            )

        towrap = compile_state.eager_joins.setdefault(
            query_entity_key, default_towrap
        )

        if adapter:
            if getattr(adapter, "aliased_class", None):
                # joining from an adapted entity.  The adapted entity
                # might be a "with_polymorphic", so resolve that to our
                # specific mapper's entity before looking for our attribute
                # name on it.
                efm = inspect(adapter.aliased_class)._entity_for_mapper(
                    localparent
                    if localparent.isa(self.parent)
                    else self.parent
                )

                # look for our attribute on the adapted entity, else fall back
                # to our straight property
                onclause = getattr(efm.entity, self.key, self.parent_property)
            else:
                onclause = getattr(
                    orm_util.AliasedClass(
                        self.parent, adapter.selectable, use_mapper_path=True
                    ),
                    self.key,
                    self.parent_property,
                )

        else:
            onclause = self.parent_property

        assert clauses.aliased_class is not None

        attach_on_outside = (
            not chained_from_outerjoin
            or not innerjoin
            or innerjoin == "unnested"
            or query_entity.entity_zero.represents_outer_join
        )

        extra_join_criteria = extra_criteria
        additional_entity_criteria = compile_state.global_attributes.get(
            ("additional_entity_criteria", self.mapper), ()
        )
        if additional_entity_criteria:
            extra_join_criteria += tuple(
                ae._resolve_where_criteria(self.mapper)
                for ae in additional_entity_criteria
                if ae.propagate_to_loaders
            )

        if attach_on_outside:
            # this is the "classic" eager join case.
            eagerjoin = orm_util._ORMJoin(
                towrap,
                clauses.aliased_class,
                onclause,
                isouter=not innerjoin
                or query_entity.entity_zero.represents_outer_join
                or (chained_from_outerjoin and isinstance(towrap, sql.Join)),
                _left_memo=self.parent,
                _right_memo=self.mapper,
                _extra_criteria=extra_join_criteria,
            )
        else:
            # all other cases are innerjoin=='nested' approach
            eagerjoin = self._splice_nested_inner_join(
                path, towrap, clauses, onclause, extra_join_criteria
            )

        compile_state.eager_joins[query_entity_key] = eagerjoin

        # send a hint to the Query as to where it may "splice" this join
        eagerjoin.stop_on = query_entity.selectable

        if not parentmapper:
            # for parentclause that is the non-eager end of the join,
            # ensure all the parent cols in the primaryjoin are actually
            # in the
            # columns clause (i.e. are not deferred), so that aliasing applied
            # by the Query propagates those columns outward.
            # This has the effect
            # of "undefering" those columns.
            for col in sql_util._find_columns(
                self.parent_property.primaryjoin
            ):
                if localparent.persist_selectable.c.contains_column(col):
                    if adapter:
                        col = adapter.columns[col]
                    compile_state._append_dedupe_col_collection(
                        col, compile_state.primary_columns
                    )

        if self.parent_property.order_by:
            compile_state.eager_order_by += tuple(
                (eagerjoin._target_adapter.copy_and_process)(
                    util.to_list(self.parent_property.order_by)
                )
            )

    def _splice_nested_inner_join(
        self, path, join_obj, clauses, onclause, extra_criteria, splicing=False
    ):

        if splicing is False:
            # first call is always handed a join object
            # from the outside
            assert isinstance(join_obj, orm_util._ORMJoin)
        elif isinstance(join_obj, sql.selectable.FromGrouping):
            return self._splice_nested_inner_join(
                path,
                join_obj.element,
                clauses,
                onclause,
                extra_criteria,
                splicing,
            )
        elif not isinstance(join_obj, orm_util._ORMJoin):
            if path[-2] is splicing:
                return orm_util._ORMJoin(
                    join_obj,
                    clauses.aliased_class,
                    onclause,
                    isouter=False,
                    _left_memo=splicing,
                    _right_memo=path[-1].mapper,
                    _extra_criteria=extra_criteria,
                )
            else:
                # only here if splicing == True
                return None

        target_join = self._splice_nested_inner_join(
            path,
            join_obj.right,
            clauses,
            onclause,
            extra_criteria,
            join_obj._right_memo,
        )
        if target_join is None:
            right_splice = False
            target_join = self._splice_nested_inner_join(
                path,
                join_obj.left,
                clauses,
                onclause,
                extra_criteria,
                join_obj._left_memo,
            )
            if target_join is None:
                # should only return None when recursively called,
                # e.g. splicing==True
                assert (
                    splicing is not False
                ), "assertion failed attempting to produce joined eager loads"
                return None
        else:
            right_splice = True

        if right_splice:
            # for a right splice, attempt to flatten out
            # a JOIN b JOIN c JOIN .. to avoid needless
            # parenthesis nesting
            if not join_obj.isouter and not target_join.isouter:
                eagerjoin = join_obj._splice_into_center(target_join)
            else:
                eagerjoin = orm_util._ORMJoin(
                    join_obj.left,
                    target_join,
                    join_obj.onclause,
                    isouter=join_obj.isouter,
                    _left_memo=join_obj._left_memo,
                )
        else:
            eagerjoin = orm_util._ORMJoin(
                target_join,
                join_obj.right,
                join_obj.onclause,
                isouter=join_obj.isouter,
                _right_memo=join_obj._right_memo,
            )

        eagerjoin._target_adapter = target_join._target_adapter
        return eagerjoin

    def _create_eager_adapter(self, context, result, adapter, path, loadopt):
        compile_state = context.compile_state

        user_defined_adapter = (
            self._init_user_defined_eager_proc(
                loadopt, compile_state, context.attributes
            )
            if loadopt
            else False
        )

        if user_defined_adapter is not False:
            decorator = user_defined_adapter
            # user defined eagerloads are part of the "primary"
            # portion of the load.
            # the adapters applied to the Query should be honored.
            if compile_state.compound_eager_adapter and decorator:
                decorator = decorator.wrap(
                    compile_state.compound_eager_adapter
                )
            elif compile_state.compound_eager_adapter:
                decorator = compile_state.compound_eager_adapter
        else:
            decorator = path.get(
                compile_state.attributes, "eager_row_processor"
            )
            if decorator is None:
                return False

        if self.mapper._result_has_identity_key(result, decorator):
            return decorator
        else:
            # no identity key - don't return a row
            # processor, will cause a degrade to lazy
            return False

    def create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):
        if not self.parent.class_manager[self.key].impl.supports_population:
            raise sa_exc.InvalidRequestError(
                "'%s' does not support object "
                "population - eager loading cannot be applied." % self
            )

        if self.uselist:
            context.loaders_require_uniquing = True

        our_path = path[self.parent_property]

        eager_adapter = self._create_eager_adapter(
            context, result, adapter, our_path, loadopt
        )

        if eager_adapter is not False:
            key = self.key

            _instance = loading._instance_processor(
                query_entity,
                self.mapper,
                context,
                result,
                our_path[self.entity],
                eager_adapter,
            )

            if not self.uselist:
                self._create_scalar_loader(context, key, _instance, populators)
            else:
                self._create_collection_loader(
                    context, key, _instance, populators
                )
        else:
            self.parent_property._get_strategy(
                (("lazy", "select"),)
            ).create_row_processor(
                context,
                query_entity,
                path,
                loadopt,
                mapper,
                result,
                adapter,
                populators,
            )

    def _create_collection_loader(self, context, key, _instance, populators):
        def load_collection_from_joined_new_row(state, dict_, row):
            # note this must unconditionally clear out any existing collection.
            # an existing collection would be present only in the case of
            # populate_existing().
            collection = attributes.init_state_collection(state, dict_, key)
            result_list = util.UniqueAppender(
                collection, "append_without_event"
            )
            context.attributes[(state, key)] = result_list
            inst = _instance(row)
            if inst is not None:
                result_list.append(inst)

        def load_collection_from_joined_existing_row(state, dict_, row):
            if (state, key) in context.attributes:
                result_list = context.attributes[(state, key)]
            else:
                # appender_key can be absent from context.attributes
                # with isnew=False when self-referential eager loading
                # is used; the same instance may be present in two
                # distinct sets of result columns
                collection = attributes.init_state_collection(
                    state, dict_, key
                )
                result_list = util.UniqueAppender(
                    collection, "append_without_event"
                )
                context.attributes[(state, key)] = result_list
            inst = _instance(row)
            if inst is not None:
                result_list.append(inst)

        def load_collection_from_joined_exec(state, dict_, row):
            _instance(row)

        populators["new"].append(
            (self.key, load_collection_from_joined_new_row)
        )
        populators["existing"].append(
            (self.key, load_collection_from_joined_existing_row)
        )
        if context.invoke_all_eagers:
            populators["eager"].append(
                (self.key, load_collection_from_joined_exec)
            )

    def _create_scalar_loader(self, context, key, _instance, populators):
        def load_scalar_from_joined_new_row(state, dict_, row):
            # set a scalar object instance directly on the parent
            # object, bypassing InstrumentedAttribute event handlers.
            dict_[key] = _instance(row)

        def load_scalar_from_joined_existing_row(state, dict_, row):
            # call _instance on the row, even though the object has
            # been created, so that we further descend into properties
            existing = _instance(row)

            # conflicting value already loaded, this shouldn't happen
            if key in dict_:
                if existing is not dict_[key]:
                    util.warn(
                        "Multiple rows returned with "
                        "uselist=False for eagerly-loaded attribute '%s' "
                        % self
                    )
            else:
                # this case is when one row has multiple loads of the
                # same entity (e.g. via aliasing), one has an attribute
                # that the other doesn't.
                dict_[key] = existing

        def load_scalar_from_joined_exec(state, dict_, row):
            _instance(row)

        populators["new"].append((self.key, load_scalar_from_joined_new_row))
        populators["existing"].append(
            (self.key, load_scalar_from_joined_existing_row)
        )
        if context.invoke_all_eagers:
            populators["eager"].append(
                (self.key, load_scalar_from_joined_exec)
            )


@log.class_logger
@relationships.RelationshipProperty.strategy_for(lazy="selectin")
class SelectInLoader(PostLoader, util.MemoizedSlots):
    __slots__ = (
        "join_depth",
        "omit_join",
        "_parent_alias",
        "_query_info",
        "_fallback_query_info",
    )

    query_info = collections.namedtuple(
        "queryinfo",
        [
            "load_only_child",
            "load_with_join",
            "in_expr",
            "pk_cols",
            "zero_idx",
            "child_lookup_cols",
        ],
    )

    _chunksize = 500

    def __init__(self, parent, strategy_key):
        super(SelectInLoader, self).__init__(parent, strategy_key)
        self.join_depth = self.parent_property.join_depth
        is_m2o = self.parent_property.direction is interfaces.MANYTOONE

        if self.parent_property.omit_join is not None:
            self.omit_join = self.parent_property.omit_join
        else:
            lazyloader = self.parent_property._get_strategy(
                (("lazy", "select"),)
            )
            if is_m2o:
                self.omit_join = lazyloader.use_get
            else:
                self.omit_join = self.parent._get_clause[0].compare(
                    lazyloader._rev_lazywhere,
                    use_proxies=True,
                    compare_keys=False,
                    equivalents=self.parent._equivalent_columns,
                )

        if self.omit_join:
            if is_m2o:
                self._query_info = self._init_for_omit_join_m2o()
                self._fallback_query_info = self._init_for_join()
            else:
                self._query_info = self._init_for_omit_join()
        else:
            self._query_info = self._init_for_join()

    def _init_for_omit_join(self):
        pk_to_fk = dict(
            self.parent_property._join_condition.local_remote_pairs
        )
        pk_to_fk.update(
            (equiv, pk_to_fk[k])
            for k in list(pk_to_fk)
            for equiv in self.parent._equivalent_columns.get(k, ())
        )

        pk_cols = fk_cols = [
            pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk
        ]
        if len(fk_cols) > 1:
            in_expr = sql.tuple_(*fk_cols)
            zero_idx = False
        else:
            in_expr = fk_cols[0]
            zero_idx = True

        return self.query_info(False, False, in_expr, pk_cols, zero_idx, None)

    def _init_for_omit_join_m2o(self):
        pk_cols = self.mapper.primary_key
        if len(pk_cols) > 1:
            in_expr = sql.tuple_(*pk_cols)
            zero_idx = False
        else:
            in_expr = pk_cols[0]
            zero_idx = True

        lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
        lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols]

        return self.query_info(
            True, False, in_expr, pk_cols, zero_idx, lookup_cols
        )

    def _init_for_join(self):
        self._parent_alias = aliased(self.parent.class_)
        pa_insp = inspect(self._parent_alias)
        pk_cols = [
            pa_insp._adapt_element(col) for col in self.parent.primary_key
        ]
        if len(pk_cols) > 1:
            in_expr = sql.tuple_(*pk_cols)
            zero_idx = False
        else:
            in_expr = pk_cols[0]
            zero_idx = True
        return self.query_info(False, True, in_expr, pk_cols, zero_idx, None)

    def init_class_attribute(self, mapper):
        self.parent_property._get_strategy(
            (("lazy", "select"),)
        ).init_class_attribute(mapper)

    def create_row_processor(
        self,
        context,
        query_entity,
        path,
        loadopt,
        mapper,
        result,
        adapter,
        populators,
    ):

        if context.refresh_state:
            return self._immediateload_create_row_processor(
                context,
                query_entity,
                path,
                loadopt,
                mapper,
                result,
                adapter,
                populators,
            )
        elif self._check_recursive_postload(context, path, self.join_depth):
            return

        if not self.parent.class_manager[self.key].impl.supports_population:
            raise sa_exc.InvalidRequestError(
                "'%s' does not support object "
                "population - eager loading cannot be applied." % self
            )

        # a little dance here as the "path" is still something that only
        # semi-tracks the exact series of things we are loading, still not
        # telling us about with_polymorphic() and stuff like that when it's at
        # the root..  the initial MapperEntity is more accurate for this case.
        if len(path) == 1:
            if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
                return
        elif not orm_util._entity_isa(path[-1], self.parent):
            return

        selectin_path = (
            context.compile_state.current_path or orm_util.PathRegistry.root
        ) + path

        path_w_prop = path[self.parent_property]

        # build up a path indicating the path from the leftmost
        # entity to the thing we're subquery loading.
        with_poly_entity = path_w_prop.get(
            context.attributes, "path_with_polymorphic", None
        )
        if with_poly_entity is not None:
            effective_entity = inspect(with_poly_entity)
        else:
            effective_entity = self.entity

        loading.PostLoad.callable_for_path(
            context,
            selectin_path,
            self.parent,
            self.parent_property,
            self._load_for_path,
            effective_entity,
            loadopt,
        )

    def _load_for_path(
        self, context, path, states, load_only, effective_entity, loadopt
    ):
        if load_only and self.key not in load_only:
            return

        query_info = self._query_info

        if query_info.load_only_child:
            our_states = collections.defaultdict(list)
            none_states = []

            mapper = self.parent

            for state, overwrite in states:
                state_dict = state.dict
                related_ident = tuple(
                    mapper._get_state_attr_by_column(
                        state,
                        state_dict,
                        lk,
                        passive=attributes.PASSIVE_NO_FETCH,
                    )
                    for lk in query_info.child_lookup_cols
                )
                # if the loaded parent objects do not have the foreign key
                # to the related item loaded, then degrade into the joined
                # version of selectinload
                if attributes.PASSIVE_NO_RESULT in related_ident:
                    query_info = self._fallback_query_info
                    break

                # organize states into lists keyed to particular foreign
                # key values.
                if None not in related_ident:
                    our_states[related_ident].append(
                        (state, state_dict, overwrite)
                    )
                else:
                    # For FK values that have None, add them to a
                    # separate collection that will be populated separately
                    none_states.append((state, state_dict, overwrite))

        # note the above conditional may have changed query_info
        if not query_info.load_only_child:
            our_states = [
                (state.key[1], state, state.dict, overwrite)
                for state, overwrite in states
            ]

        pk_cols = query_info.pk_cols
        in_expr = query_info.in_expr

        if not query_info.load_with_join:
            # in "omit join" mode, the primary key column and the
            # "in" expression are in terms of the related entity.  So
            # if the related entity is polymorphic or otherwise aliased,
            # we need to adapt our "pk_cols" and "in_expr" to that
            # entity.   in non-"omit join" mode, these are against the
            # parent entity and do not need adaption.
            if effective_entity.is_aliased_class:
                pk_cols = [
                    effective_entity._adapt_element(col) for col in pk_cols
                ]
                in_expr = effective_entity._adapt_element(in_expr)

        bundle_ent = orm_util.Bundle("pk", *pk_cols)
        bundle_sql = bundle_ent.__clause_element__()

        entity_sql = effective_entity.__clause_element__()
        q = Select._create_raw_select(
            _raw_columns=[bundle_sql, entity_sql],
            _label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
            _compile_options=ORMCompileState.default_compile_options,
            _propagate_attrs={
                "compile_state_plugin": "orm",
                "plugin_subject": effective_entity,
            },
        )

        if not query_info.load_with_join:
            # the Bundle we have in the "omit_join" case is against raw, non
            # annotated columns, so to ensure the Query knows its primary
            # entity, we add it explicitly.  If we made the Bundle against
            # annotated columns, we hit a performance issue in this specific
            # case, which is detailed in issue #4347.
            q = q.select_from(effective_entity)
        else:
            # in the non-omit_join case, the Bundle is against the annotated/
            # mapped column of the parent entity, but the #4347 issue does not
            # occur in this case.
            q = q.select_from(self._parent_alias).join(
                getattr(self._parent_alias, self.parent_property.key).of_type(
                    effective_entity
                )
            )

        q = q.filter(in_expr.in_(sql.bindparam("primary_keys")))

        # a test which exercises what these comments talk about is
        # test_selectin_relations.py -> test_twolevel_selectin_w_polymorphic
        #
        # effective_entity above is given to us in terms of the cached
        # statement, namely this one:
        orig_query = context.compile_state.select_statement

        # the actual statement that was requested is this one:
        #  context_query = context.query
        #
        # that's not the cached one, however.  So while it is of the identical
        # structure, if it has entities like AliasedInsp, which we get from
        # aliased() or with_polymorphic(), the AliasedInsp will likely be a
        # different object identity each time, and will not match up
        # hashing-wise to the corresponding AliasedInsp that's in the
        # cached query, meaning it won't match on paths and loader lookups
        # and loaders like this one will be skipped if it is used in options.
        #
        # Now we want to transfer loader options from the parent query to the
        # "selectinload" query we're about to run.   Which query do we transfer
        # the options from?  We use the cached query, because the options in
        # that query will be in terms of the effective entity we were just
        # handed.
        #
        # But now the selectinload query we are running is *also*
        # cached.  What if it's cached and running from some previous iteration
        # of that AliasedInsp?  Well in that case it will also use the previous
        # iteration of the loader options.   If the query expires and
        # gets generated again, it will be handed the current effective_entity
        # and the current _with_options, again in terms of whatever
        # compile_state.select_statement happens to be right now, so the
        # query will still be internally consistent and loader callables
        # will be correctly invoked.

        effective_path = path[self.parent_property]

        if orig_query is context.query:
            options = new_options = orig_query._with_options
            user_defined_options = []
        else:
            options = orig_query._with_options

            # propagate compile state options from the original query,
            # updating their "extra_criteria" as necessary.
            # note this will create a different cache key than
            # "orig" options if extra_criteria is present, because the copy
            # of extra_criteria will have different boundparam than that of
            # the QueryableAttribute in the path

            new_options = [
                orig_opt._adjust_for_extra_criteria(context)
                if orig_opt._is_strategy_option
                else orig_opt
                for orig_opt in options
                if orig_opt._is_compile_state or orig_opt._is_legacy_option
            ]

            # propagate user defined options from the current query
            user_defined_options = [
                opt
                for opt in context.query._with_options
                if not opt._is_compile_state and not opt._is_legacy_option
            ]

        if loadopt and loadopt._extra_criteria:
            new_options += (
                orm_util.LoaderCriteriaOption(
                    effective_entity,
                    loadopt._generate_extra_criteria(context),
                ),
            )

        q = q.options(*new_options)._update_compile_options(
            {"_current_path": effective_path}
        )
        if user_defined_options:
            q = q.options(*user_defined_options)

        if context.populate_existing:
            q = q.execution_options(populate_existing=True)

        if self.parent_property.order_by:
            if not query_info.load_with_join:
                eager_order_by = self.parent_property.order_by
                if effective_entity.is_aliased_class:
                    eager_order_by = [
                        effective_entity._adapt_element(elem)
                        for elem in eager_order_by
                    ]
                q = q.order_by(*eager_order_by)
            else:

                def _setup_outermost_orderby(compile_context):
                    compile_context.eager_order_by += tuple(
                        util.to_list(self.parent_property.order_by)
                    )

                q = q._add_context_option(
                    _setup_outermost_orderby, self.parent_property
                )

        if query_info.load_only_child:
            self._load_via_child(
                our_states, none_states, query_info, q, context
            )
        else:
            self._load_via_parent(our_states, query_info, q, context)

    def _load_via_child(self, our_states, none_states, query_info, q, context):
        uselist = self.uselist

        # this sort is really for the benefit of the unit tests
        our_keys = sorted(our_states)
        while our_keys:
            chunk = our_keys[0 : self._chunksize]
            our_keys = our_keys[self._chunksize :]
            data = {
                k: v
                for k, v in context.session.execute(
                    q,
                    params={
                        "primary_keys": [
                            key[0] if query_info.zero_idx else key
                            for key in chunk
                        ]
                    },
                ).unique()
            }

            for key in chunk:
                # for a real foreign key and no concurrent changes to the
                # DB while running this method, "key" is always present in
                # data.  However, for primaryjoins without real foreign keys
                # a non-None primaryjoin condition may still refer to no
                # related object.
                related_obj = data.get(key, None)
                for state, dict_, overwrite in our_states[key]:
                    if not overwrite and self.key in dict_:
                        continue

                    state.get_impl(self.key).set_committed_value(
                        state,
                        dict_,
                        related_obj if not uselist else [related_obj],
                    )
        # populate none states with empty value / collection
        for state, dict_, overwrite in none_states:
            if not overwrite and self.key in dict_:
                continue

            # note it's OK if this is a uselist=True attribute, the empty
            # collection will be populated
            state.get_impl(self.key).set_committed_value(state, dict_, None)

    def _load_via_parent(self, our_states, query_info, q, context):
        uselist = self.uselist
        _empty_result = () if uselist else None

        while our_states:
            chunk = our_states[0 : self._chunksize]
            our_states = our_states[self._chunksize :]

            primary_keys = [
                key[0] if query_info.zero_idx else key
                for key, state, state_dict, overwrite in chunk
            ]

            data = collections.defaultdict(list)
            for k, v in itertools.groupby(
                context.session.execute(
                    q, params={"primary_keys": primary_keys}
                ).unique(),
                lambda x: x[0],
            ):
                data[k].extend(vv[1] for vv in v)

            for key, state, state_dict, overwrite in chunk:

                if not overwrite and self.key in state_dict:
                    continue

                collection = data.get(key, _empty_result)

                if not uselist and collection:
                    if len(collection) > 1:
                        util.warn(
                            "Multiple rows returned with "
                            "uselist=False for eagerly-loaded "
                            "attribute '%s' " % self
                        )
                    state.get_impl(self.key).set_committed_value(
                        state, state_dict, collection[0]
                    )
                else:
                    # note that empty tuple set on uselist=False sets the
                    # value to None
                    state.get_impl(self.key).set_committed_value(
                        state, state_dict, collection
                    )


def single_parent_validator(desc, prop):
    def _do_check(state, value, oldvalue, initiator):
        if value is not None and initiator.key == prop.key:
            hasparent = initiator.hasparent(attributes.instance_state(value))
            if hasparent and oldvalue is not value:
                raise sa_exc.InvalidRequestError(
                    "Instance %s is already associated with an instance "
                    "of %s via its %s attribute, and is only allowed a "
                    "single parent."
                    % (orm_util.instance_str(value), state.class_, prop),
                    code="bbf1",
                )
        return value

    def append(state, value, initiator):
        return _do_check(state, value, None, initiator)

    def set_(state, value, oldvalue, initiator):
        return _do_check(state, value, oldvalue, initiator)

    event.listen(
        desc, "append", append, raw=True, retval=True, active_history=True
    )
    event.listen(desc, "set", set_, raw=True, retval=True, active_history=True)
